Connecting to Kafka Server via Different programming Languages
In order to access the data inside an Apache Kafka, you first need to connect to the Kafka server. We will show you the sample codes to connect your Apache Kafka via Java and Python.
Connecting via Java
First, install Java dependency packages by using the code below
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
Demo code for Producer connecting to Kafka
package kafka_connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
public class KafkaProducerTest {
public static void main(String[] args) {
String host = "kafka-4775-0.tripanels.com";
int port = 4775;
String boostrap_servers = String.format("%s:%d", host, port);
// you can create topic in control panel
String topic = "mytopic";
String sasl_username = "ben";
String sasl_password = "ben.w.dbm";
String truststore_location = "F:\\kafka.truststore.jks";
String truststore_password = "CJ2xmoFy";
String keystore_location = "F:\\kafka.keystore.jks";
String keystore_password = "CJ2xmoFy";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrap_servers);
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
// configure the following settings for SSL Connection
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststore_location);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststore_password);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystore_location);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystore_password);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
String value = "this is a message";
try {
for (long index = 30; index < 40; index++) {
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
value + ": " + index);
RecordMetadata metadata = producer.send(record).get();
System.out.println("Produce OK: " + metadata.toString());
}
} catch (Exception e) {
System.out.println("error occurred");
e.printStackTrace();
} finally {
producer.flush();
producer.close();
}
}
}
Demo code for Consumer connecting to Kafka
package kafka_connection;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaConsumerTest {
public static void main(String[] args) {
// TODO Auto-generated method stub
String host = "kafka-4775-0.tripanels.com";
int port = 4775;
String boostrap_servers = String.format("%s:%d", host, port);
// you can create topic in control panel
String topic = "mytopic";
String sasl_username = "ben";
String sasl_password = "ben.w.dbm";
String truststore_location = "F:\\kafka.truststore.jks";
String truststore_password = "CJ2xmoFy";
String keystore_location = "F:\\kafka.keystore.jks";
String keystore_password = "CJ2xmoFy";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrap_servers);
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
// configure the following settings for SSL Connection
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststore_location);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststore_password);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystore_location);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystore_password);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Kafka java client must have a group for consumer.
// You can specify a group name at will, and kafka server will create
// the consumer group for you if it doesn't exist
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(),
record.value());
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
consumer.close();
}
}
Connecting via Python
Demo code for Consumer connecting to Kafka
From Kafka import KafkaConsumer
class KafkaConnect(object):
host = "127.0.0.1" # your host
sasl_port = 9092 # your port
username = "producer name"
password = "producer pwd"
def product_connect(self):
client = KafkaProducer(
bootstrap_servers='{}:{}'.format(self.host, self.sasl_port),
sasl_mechanism="SCRAM-SHA-256", # connection method
api_version=(2, 7),
sasl_plain_password=self.password,
sasl_plain_username=self.username,
security_protocol="SASL_SSL",
ssl_cafile="cacert.pem",
ssl_certfile="cert.pem",
ssl_keyfile="key.pem",
)
return client
if __name__ == '__main__':
client = KafkaConnect().product_connect()
client.send("your producer topic", b"message")
client.flush()
Demo code for Producer connecting to Kafka
From Kafka import KafkaProducer
class KafkaConnect(object):
host = "127.0.0.1" # your host
sasl_port = 9092 # your port
username = "consume name"
password = "consume password"
topic = 'consume topic'
def consume_connect(self):
client = KafkaConsumer(
self.topic,
api_version=(2, 7),
auto_offset_reset="earliest",
bootstrap_servers='{}:{}'.format(self.host, self.sasl_port),
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username=self.username,
sasl_plain_password=self.password,
security_protocol="SASL_SSL",
ssl_cafile="cacert.pem",
ssl_certfile="cert.pem",
ssl_keyfile="key.pem",
)
return client
if __name__ == '__main__':
client = KafkaConnect().consume_connect()
for i in client:
print(i)